# -*- coding: utf-8 -*-
from datetime import timedelta
from utils.operators.cluster_for_spark_sql_operator import SparkSqlOperator
from spmi.time_sensor.time_after_04_00 import time_after_04_00
from spmi.ods.mysql.spmi import spmi_ods__spmi_transfer_bill
from airflow.operators.dummy_operator import DummyOperator


spmi_dwd__dwd_spmi_transfer_bill_new_dt_a= SparkSqlOperator(
    task_id='spmi_dwd__dwd_spmi_transfer_bill_new_dt_a',
    email=['lukunming@jtexpress.com','yl_bigdata@yl-scm.com'],
    master='yarn',
    name='spmi_dwd__dwd_spmi_transfer_bill_new_dt_a_{{ execution_date | date_add(1) | cst_ds }}',
    sql='spmi/dwd/spmi/dwd_spmi_transfer_bill_new_dt/execute_a.sql',
    pool_slots=2,
    driver_cores=2,
    driver_memory='8G',
    executor_cores=5,
    executor_memory='12G',
    num_executors=70,
    conf={'spark.dynamicAllocation.enabled'                  : 'true',
          'spark.shuffle.service.enabled'                    : 'true',
          'spark.dynamicAllocation.maxExecutors'             : 90,
          'spark.dynamicAllocation.cachedExecutorIdleTimeout': 120,
          'spark.sql.sources.partitionOverwriteMode'         : 'dynamic',
          'spark.executor.memoryOverhead'                    : '4G',
          'spark.yarn.maxAppAttempts'                        : 1,
          'spark.sql.shuffle.partitions': 2000,
          'spark.io.compression.codec': 'org.apache.spark.io.ZStdCompressionCodec',
          # 'spark.shuffle.memoryFraction': '0.8',
          # 'spark.shuffle.file.buffer': '64k',
          # 'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:ParallelGCThreads=4 -XX:ConcGCThreads=2'
          },
    hiveconf={'hive.exec.dynamic.partition'             : 'true',  # 动态分区
              'hive.exec.dynamic.partition.mode'        : 'nonstrict',
              'hive.exec.max.dynamic.partitions'        : 300,  # 最大分区
              'hive.exec.max.dynamic.partitions.pernode': 300,  # 最大分区
              },
    yarn_queue='pro',
    execution_timeout=timedelta(minutes=100),
    retries=0,
)
spmi_dwd__dwd_spmi_transfer_bill_new_dt_a << [spmi_ods__spmi_transfer_bill,time_after_04_00]


spmi_dwd__dwd_spmi_transfer_bill_new_dt_b= SparkSqlOperator(
    task_id='spmi_dwd__dwd_spmi_transfer_bill_new_dt_b',
    email=['lukunming@jtexpress.com','yl_bigdata@yl-scm.com'],
    master='yarn',
    name='spmi_dwd__dwd_spmi_transfer_bill_new_dt_b_{{ execution_date | date_add(1) | cst_ds }}',
    sql='spmi/dwd/spmi/dwd_spmi_transfer_bill_new_dt/execute_b.sql',
    pool_slots=2,
    driver_cores=2,
    driver_memory='8G',
    executor_cores=5,
    executor_memory='12G',
    num_executors=70,
    conf={'spark.dynamicAllocation.enabled'                  : 'true',
          'spark.shuffle.service.enabled'                    : 'true',
          'spark.dynamicAllocation.maxExecutors'             : 90,
          'spark.dynamicAllocation.cachedExecutorIdleTimeout': 120,
          'spark.sql.sources.partitionOverwriteMode'         : 'dynamic',
          'spark.executor.memoryOverhead'                    : '4G',
          'spark.yarn.maxAppAttempts'                        : 1,
          'spark.sql.shuffle.partitions': 2000,
          'spark.io.compression.codec': 'org.apache.spark.io.ZStdCompressionCodec',
          # 'spark.shuffle.memoryFraction': '0.8',
          # 'spark.shuffle.file.buffer': '64k',
          # 'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:ParallelGCThreads=4 -XX:ConcGCThreads=2'
          },
    hiveconf={'hive.exec.dynamic.partition'             : 'true',  # 动态分区
              'hive.exec.dynamic.partition.mode'        : 'nonstrict',
              'hive.exec.max.dynamic.partitions'        : 300,  # 最大分区
              'hive.exec.max.dynamic.partitions.pernode': 300,  # 最大分区
              },
    yarn_queue='pro',
    execution_timeout=timedelta(minutes=100),
    retries=0,
)
spmi_dwd__dwd_spmi_transfer_bill_new_dt_b << [spmi_ods__spmi_transfer_bill,time_after_04_00]


spmi_dwd__dwd_spmi_transfer_bill_new_dt = DummyOperator(
    task_id='spmi_dwd__dwd_spmi_transfer_bill_new_dt',
    email=['lukunming@jtexpress.com','yl_bigdata@yl-scm.com'],
    retries=0,
    priority_weight=0,
    sla=timedelta(hours=5),
)
spmi_dwd__dwd_spmi_transfer_bill_new_dt << [spmi_dwd__dwd_spmi_transfer_bill_new_dt_b,spmi_dwd__dwd_spmi_transfer_bill_new_dt_b]